{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Workflows cookbook: walking through all features of Workflows\n",
    "\n",
    "First, we install our dependencies. Core contains most of what we need; OpenAI is to handle LLM access and utils-workflow provides the visualization capabilities we'll use later on."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we bring in the deps we just installed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import (\n",
    "    Event,\n",
    "    StartEvent,\n",
    "    StopEvent,\n",
    "    Workflow,\n",
    "    step,\n",
    "    Context,\n",
    ")\n",
    "import random\n",
    "from llama_index.core.workflow import draw_all_possible_flows\n",
    "from llama_index.utils.workflow import draw_most_recent_execution\n",
    "from llama_index.llms.openai import OpenAI"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Set up our OpenAI key, so we can do actual LLM things."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = \"sk-proj-...\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Workflow basics\n",
    "\n",
    "Let's start with the basic possible workflow: it just starts, does one thing, and stops. There's no reason to have a real workflow if your task is this simple, but we're just demonstrating how they work."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "LlamaIndex, formerly known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides tools to index various data types, such as documents, databases, and APIs, enabling LLMs to interact with and retrieve information from these sources more effectively. The framework supports the creation of indices that can be queried by LLMs, enhancing their ability to access and utilize external data in a structured manner. This capability is particularly useful for applications requiring the integration of LLMs with specific datasets or knowledge bases.\n"
     ]
    }
   ],
   "source": [
    "from llama_index.llms.openai import OpenAI\n",
    "\n",
    "\n",
    "class OpenAIGenerator(Workflow):\n",
    "    @step\n",
    "    async def generate(self, ev: StartEvent) -> StopEvent:\n",
    "        llm = OpenAI(model=\"gpt-4o\")\n",
    "        response = await llm.acomplete(ev.query)\n",
    "        return StopEvent(result=str(response))\n",
    "\n",
    "\n",
    "w = OpenAIGenerator(timeout=10, verbose=False)\n",
    "result = await w.run(query=\"What's LlamaIndex?\")\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "One of the neat things about Workflows is that we can use pyvis to visualize them. Let's see what that looks like for this very simple flow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "draw_all_possible_flows(OpenAIGenerator, filename=\"trivial_workflow.html\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Screenshot 2024-08-05 at 11.59.03 AM.png]()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Not a lot to see here, yet! The start event goes to generate() and then straight to StopEvent.\n",
    "\n",
    "## Loops and branches\n",
    "\n",
    "Let's go to a more interesting example, demonstrating our ability to loop:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class FailedEvent(Event):\n",
    "    error: str\n",
    "\n",
    "\n",
    "class QueryEvent(Event):\n",
    "    query: str\n",
    "\n",
    "\n",
    "class LoopExampleFlow(Workflow):\n",
    "    @step\n",
    "    async def answer_query(\n",
    "        self, ev: StartEvent | QueryEvent\n",
    "    ) -> FailedEvent | StopEvent:\n",
    "        query = ev.query\n",
    "        # try to answer the query\n",
    "        random_number = random.randint(0, 1)\n",
    "        if random_number == 0:\n",
    "            return FailedEvent(error=\"Failed to answer the query.\")\n",
    "        else:\n",
    "            return StopEvent(result=\"The answer to your query\")\n",
    "\n",
    "    @step\n",
    "    async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:\n",
    "        # improve the query or decide it can't be fixed\n",
    "        random_number = random.randint(0, 1)\n",
    "        if random_number == 0:\n",
    "            return QueryEvent(query=\"Here's a better query.\")\n",
    "        else:\n",
    "            return StopEvent(result=\"Your query can't be fixed.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We're using random numbers to simulate LLM actions here so that we can get reliably interesting behavior.\n",
    "\n",
    "answer_query() accepts a start event. It can then do 2 things:\n",
    "* it can answer the query and emit a StopEvent, which returns the result\n",
    "* it can decide the query was bad and emit a FailedEvent\n",
    "\n",
    "improve_query() accepts a FailedEvent. It can also do 2 things:\n",
    "* it can decide the query can't be improved and emit a StopEvent, which returns failure\n",
    "* it can present a better query and emit a QueryEvent, which creates a loop back to answer_query()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also visualize this more complicated workflow:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "loop_workflow.html\n"
     ]
    }
   ],
   "source": [
    "draw_all_possible_flows(LoopExampleFlow, filename=\"loop_workflow.html\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Screenshot 2024-08-05 at 11.36.05 AM.png]()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We've set `verbose=True` here so we can see exactly what events were triggered. You can see it conveniently demonstrates looping and then answering."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running step answer_query\n",
      "Step answer_query produced event FailedEvent\n",
      "Running step improve_query\n",
      "Step improve_query produced event StopEvent\n",
      "Your query can't be fixed.\n"
     ]
    }
   ],
   "source": [
    "l = LoopExampleFlow(timeout=10, verbose=True)\n",
    "result = await l.run(query=\"What's LlamaIndex?\")\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Maintaining state between events\n",
    "\n",
    "There is a global state which allows you to keep arbitrary data or functions around for use by all event handlers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class GlobalExampleFlow(Workflow):\n",
    "    @step\n",
    "    async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:\n",
    "        # load our data here\n",
    "        await ctx.store.set(\"some_database\", [\"value1\", \"value2\", \"value3\"])\n",
    "\n",
    "        return QueryEvent(query=ev.query)\n",
    "\n",
    "    @step\n",
    "    async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:\n",
    "        # use our data with our query\n",
    "        data = await ctx.store.get(\"some_database\")\n",
    "\n",
    "        result = f\"The answer to your query is {data[1]}\"\n",
    "        return StopEvent(result=result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running step setup\n",
      "Step setup produced event QueryEvent\n",
      "Running step query\n",
      "Step query produced event StopEvent\n",
      "The answer to your query is value2\n"
     ]
    }
   ],
   "source": [
    "g = GlobalExampleFlow(timeout=10, verbose=True)\n",
    "result = await g.run(query=\"What's LlamaIndex?\")\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Of course, this flow is essentially still linear. A more realistic example would be if your start event could either be a query or a data population event, and you needed to wait. Let's set that up to see what it looks like:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class WaitExampleFlow(Workflow):\n",
    "    @step\n",
    "    async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:\n",
    "        if hasattr(ev, \"data\"):\n",
    "            await ctx.store.set(\"data\", ev.data)\n",
    "\n",
    "        return StopEvent(result=None)\n",
    "\n",
    "    @step\n",
    "    async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:\n",
    "        if hasattr(ev, \"query\"):\n",
    "            # do we have any data?\n",
    "            if hasattr(self, \"data\"):\n",
    "                data = await ctx.store.get(\"data\")\n",
    "                return StopEvent(result=f\"Got the data {data}\")\n",
    "            else:\n",
    "                # there's non data yet\n",
    "                return None\n",
    "        else:\n",
    "            # this isn't a query\n",
    "            return None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running step query\n",
      "Step query produced no event\n",
      "Running step setup\n",
      "Step setup produced event StopEvent\n",
      "No you can't\n",
      "---\n",
      "Running step query\n",
      "Step query produced no event\n",
      "Running step setup\n",
      "Step setup produced event StopEvent\n",
      "---\n",
      "Running step query\n",
      "Step query produced event StopEvent\n",
      "Running step setup\n",
      "Step setup produced event StopEvent\n",
      "Got the data Yes you can\n"
     ]
    }
   ],
   "source": [
    "w = WaitExampleFlow(verbose=True)\n",
    "result = await w.run(query=\"Can I kick it?\")\n",
    "if result is None:\n",
    "    print(\"No you can't\")\n",
    "print(\"---\")\n",
    "result = await w.run(data=\"Yes you can\")\n",
    "print(\"---\")\n",
    "result = await w.run(query=\"Can I kick it?\")\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's visualize how this flow works:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "wait_workflow.html\n"
     ]
    }
   ],
   "source": [
    "draw_all_possible_flows(WaitExampleFlow, filename=\"wait_workflow.html\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Screenshot 2024-08-05 at 1.37.23 PM.png]()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Waiting for one or more events\n",
    "\n",
    "Because waiting for events is such a common pattern, the context object has a convenience function, `collect_events()`. It will capture events and store them, returning `None` until all the events it requires have been collected. Those events will be attached to the output of `collect_events` in the order that they were specified. Let's see this in action:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class InputEvent(Event):\n",
    "    input: str\n",
    "\n",
    "\n",
    "class SetupEvent(Event):\n",
    "    error: bool\n",
    "\n",
    "\n",
    "class QueryEvent(Event):\n",
    "    query: str\n",
    "\n",
    "\n",
    "class CollectExampleFlow(Workflow):\n",
    "    @step\n",
    "    async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:\n",
    "        # generically start everything up\n",
    "        if not hasattr(self, \"setup\") or not self.setup:\n",
    "            self.setup = True\n",
    "            print(\"I got set up\")\n",
    "        return SetupEvent(error=False)\n",
    "\n",
    "    @step\n",
    "    async def collect_input(self, ev: StartEvent) -> InputEvent:\n",
    "        if hasattr(ev, \"input\"):\n",
    "            # perhaps validate the input\n",
    "            print(\"I got some input\")\n",
    "            return InputEvent(input=ev.input)\n",
    "\n",
    "    @step\n",
    "    async def parse_query(self, ev: StartEvent) -> QueryEvent:\n",
    "        if hasattr(ev, \"query\"):\n",
    "            # parse the query in some way\n",
    "            print(\"I got a query\")\n",
    "            return QueryEvent(query=ev.query)\n",
    "\n",
    "    @step\n",
    "    async def run_query(\n",
    "        self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent\n",
    "    ) -> StopEvent | None:\n",
    "        ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])\n",
    "        if ready is None:\n",
    "            print(\"Not enough events yet\")\n",
    "            return None\n",
    "\n",
    "        # run the query\n",
    "        print(\"Now I have all the events\")\n",
    "        print(ready)\n",
    "\n",
    "        result = f\"Ran query '{ready[0].query}' on input '{ready[1].input}'\"\n",
    "        return StopEvent(result=result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "I got some input\n",
      "I got a query\n",
      "Not enough events yet\n",
      "Not enough events yet\n",
      "Now I have all the events\n",
      "[QueryEvent(query=\"Here's my question\"), InputEvent(input=\"Here's some input\"), SetupEvent(error=False)]\n",
      "Ran query 'Here's my question' on input 'Here's some input'\n"
     ]
    }
   ],
   "source": [
    "c = CollectExampleFlow()\n",
    "result = await c.run(input=\"Here's some input\", query=\"Here's my question\")\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can see each of the events getting triggered as well as the collection event repeatedly returning `None` until enough events have arrived. Let's see what this looks like in a flow diagram:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "collect_workflow.html\n"
     ]
    }
   ],
   "source": [
    "draw_all_possible_flows(CollectExampleFlow, \"collect_workflow.html\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Screenshot 2024-08-05 at 2.27.46 PM.png]()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This concludes our tour of creating, running and visualizing workflows! Check out the [docs](https://docs.llamaindex.ai/en/stable/module_guides/workflow/) and [examples](https://docs.llamaindex.ai/en/stable/examples/workflow/function_calling_agent/) to learn more."
   ]
  }
 ],
 "metadata": {
  "colab": {
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Python 3",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
